Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47910][CORE] close stream when DiskBlockObjectWriter closeResources to avoid memory leak #46131

Closed
wants to merge 5 commits into from

Conversation

JacobZheng0927
Copy link
Contributor

@JacobZheng0927 JacobZheng0927 commented Apr 19, 2024

What changes were proposed in this pull request?

close stream when DiskBlockObjectWriter closeResources to avoid memory leak

Why are the changes needed?

SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of CompressionCodec.compressedOutputStream would need to manually close the stream as this would no longer be handled by the finalizer mechanism.
When using zstd for shuffle write compression, if for some reason the execution of this process is interrupted(eg. enable spark.sql.execution.interruptOnCancel and cancel Job). The memory used by ZstdInputStreamNoFinalizer may not be freed, causing a memory leak.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Spark Shell Configuration

$> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g"
$> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd

Test Script

import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
(1 to 50).foreach { batch => {
  val jobA = Future {
    val df1 = spark.range(2000000).map { _ =>(Random.nextString(20),Random.nextInt(1000),Random.nextInt(1000),Random.nextInt(10))}.toDF("a","b","c","d")
    val df2 = spark.range(2000000).map { _ =>(Random.nextString(20),Random.nextInt(1000),Random.nextInt(1000),Random.nextInt(10))}.toDF("a","b","c","d")
    df1.join(df2,"b").show()  }
  Thread.sleep(5000)
  sc.cancelJobGroup("jobA")
}}

Memory Monitor

$> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done;

Results

Before
"2024-05-13 16:54:23",1332384
"2024-05-13 16:54:33",1417112
"2024-05-13 16:54:43",2211684
"2024-05-13 16:54:53",3060820
"2024-05-13 16:55:03",3850444
"2024-05-13 16:55:14",4631744
"2024-05-13 16:55:24",5317200
"2024-05-13 16:55:34",6019464
"2024-05-13 16:55:44",6489180
"2024-05-13 16:55:54",7255548
"2024-05-13 16:56:05",7718728
"2024-05-13 16:56:15",8388392
"2024-05-13 16:56:25",8927636
"2024-05-13 16:56:36",9473412
"2024-05-13 16:56:46",10000380
"2024-05-13 16:56:56",10344024
"2024-05-13 16:57:07",10734204
"2024-05-13 16:57:17",11211900
"2024-05-13 16:57:27",11665524
"2024-05-13 16:57:38",12268976
"2024-05-13 16:57:48",12896264
"2024-05-13 16:57:58",13572244
"2024-05-13 16:58:09",14252416
"2024-05-13 16:58:19",14915560
"2024-05-13 16:58:30",15484196
"2024-05-13 16:58:40",16170324
After
"2024-05-13 16:35:44",1355428
"2024-05-13 16:35:54",1391028
"2024-05-13 16:36:04",1673720
"2024-05-13 16:36:14",2103716
"2024-05-13 16:36:24",2129876
"2024-05-13 16:36:35",2166412
"2024-05-13 16:36:45",2177672
"2024-05-13 16:36:55",2188340
"2024-05-13 16:37:05",2190688
"2024-05-13 16:37:15",2195168
"2024-05-13 16:37:26",2199296
"2024-05-13 16:37:36",2228052
"2024-05-13 16:37:46",2238104
"2024-05-13 16:37:56",2260624
"2024-05-13 16:38:06",2307184
"2024-05-13 16:38:16",2331140
"2024-05-13 16:38:27",2323388
"2024-05-13 16:38:37",2357552
"2024-05-13 16:38:47",2352948
"2024-05-13 16:38:57",2364744
"2024-05-13 16:39:07",2368528
"2024-05-13 16:39:18",2385492
"2024-05-13 16:39:28",2389184
"2024-05-13 16:39:38",2388060
"2024-05-13 16:39:48",2388336
"2024-05-13 16:39:58",2386916

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Apr 19, 2024
@HyukjinKwon HyukjinKwon changed the title SPARK-47910; close stream when DiskBlockObjectWriter closeResources to avoid memory leak [SPARK-47910][CORE] close stream when DiskBlockObjectWriter closeResources to avoid memory leak Apr 19, 2024
@JacobZheng0927
Copy link
Contributor Author

cc @dongjoon-hyun
This is a similar to #35613. Please take a look, thanks!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, @JacobZheng0927 .

However, your PR fails to compile. Please make GitHub Action CI green.

[error] (core / Compile / compileIncremental) Compilation failed

@JacobZheng0927
Copy link
Contributor Author

Thank you for making a PR, @JacobZheng0927 .

However, your PR fails to compile. Please make GitHub Action CI green.

[error] (core / Compile / compileIncremental) Compilation failed

Done.

}
}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Redundant empty line. Please remove this.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like #35613, do you think you can provide a way to validate your PR, @JacobZheng0927 ?

} catch {
case e: IOException =>
logError(log"Exception occurred while closing the output stream" +
log"${MDC(ERROR, e.getMessage)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error -> info
and the exception stack trace would be useful

Comment on lines 180 to 183
objOut = closeIfNonNull(objOut)
bs = null
} {
bs = closeIfNonNull(bs)
Copy link
Contributor

@mridulm mridulm Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
objOut = closeIfNonNull(objOut)
bs = null
} {
bs = closeIfNonNull(bs)
if (null != objOut) objOut.close()
bs = null
} {
objOut = null
if (null != bs) bs.close()
bs = null

And remove closeIfNonNull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I'll fix it.

@JacobZheng0927
Copy link
Contributor Author

Like #35613, do you think you can provide a way to validate your PR, @JacobZheng0927 ?

Ok, I'll try to reproduce the problem with a simple script.

@JacobZheng0927
Copy link
Contributor Author

I apologize for the long delay in updating. I've just added the steps for reproduction, please take a look. @dongjoon-hyun @mridulm

@mridulm
Copy link
Contributor

mridulm commented May 13, 2024

@JacobZheng0927, please add it as a unit test.

@JacobZheng0927
Copy link
Contributor Author

@JacobZheng0927, please add it as a unit test.

I'm not sure how to test for native memory leak cases in unit tests. is there a relevant example I can refer to?

@mridulm
Copy link
Contributor

mridulm commented May 23, 2024

One way I can quickly think of is to check if objOut.close() or bs.close() is being called or not.
For example, adapt the "calling closeAndDelete() on a partial write file" test and use either a custom serializer or a custom compression codec to check for close being invoked ?

(Sorry for the delay in getting back to you - this PR dropped off my todo list unfortunately)

@JacobZheng0927
Copy link
Contributor Author

SPARK-47910; add a check to see if the close method was called in UT

@mridulm I added some unit test code, but I'm not sure if this is the appropriate way to test it.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test looks good to me, just a minor comment to ensure this does not break as code evolves.

}

trait CloseDetecting {
var closed = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
var closed = false
def isClosed: Boolean

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

+CC @dongjoon-hyun in case you have additional feedback, thanks !

@mridulm
Copy link
Contributor

mridulm commented Jun 18, 2024

Merging to master and 3.5
@dongjoon-hyun's requested change is resolved as the CI is green.

@mridulm mridulm closed this in e265c60 Jun 18, 2024
mridulm pushed a commit that referenced this pull request Jun 18, 2024
…urces to avoid memory leak

### What changes were proposed in this pull request?
close stream when DiskBlockObjectWriter closeResources to avoid memory leak

### Why are the changes needed?
[SPARK-34647](https://issues.apache.org/jira/browse/SPARK-34647) replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of CompressionCodec.compressedOutputStream would need to manually close the stream as this would no longer be handled by the finalizer mechanism.
When using zstd for shuffle write compression, if for some reason the execution of this process is interrupted(eg. enable spark.sql.execution.interruptOnCancel and cancel Job). The memory used by `ZstdInputStreamNoFinalizer` may not be freed, causing a memory leak.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
#### Spark Shell Configuration
```
$> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g"
$> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd
```

#### Test Script
```scala
import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
(1 to 50).foreach { batch => {
  val jobA = Future {
    val df1 = spark.range(2000000).map { _ =>(Random.nextString(20),Random.nextInt(1000),Random.nextInt(1000),Random.nextInt(10))}.toDF("a","b","c","d")
    val df2 = spark.range(2000000).map { _ =>(Random.nextString(20),Random.nextInt(1000),Random.nextInt(1000),Random.nextInt(10))}.toDF("a","b","c","d")
    df1.join(df2,"b").show()  }
  Thread.sleep(5000)
  sc.cancelJobGroup("jobA")
}}
```

#### Memory Monitor
```
$> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done;
```

#### Results
##### Before
```
"2024-05-13 16:54:23",1332384
"2024-05-13 16:54:33",1417112
"2024-05-13 16:54:43",2211684
"2024-05-13 16:54:53",3060820
"2024-05-13 16:55:03",3850444
"2024-05-13 16:55:14",4631744
"2024-05-13 16:55:24",5317200
"2024-05-13 16:55:34",6019464
"2024-05-13 16:55:44",6489180
"2024-05-13 16:55:54",7255548
"2024-05-13 16:56:05",7718728
"2024-05-13 16:56:15",8388392
"2024-05-13 16:56:25",8927636
"2024-05-13 16:56:36",9473412
"2024-05-13 16:56:46",10000380
"2024-05-13 16:56:56",10344024
"2024-05-13 16:57:07",10734204
"2024-05-13 16:57:17",11211900
"2024-05-13 16:57:27",11665524
"2024-05-13 16:57:38",12268976
"2024-05-13 16:57:48",12896264
"2024-05-13 16:57:58",13572244
"2024-05-13 16:58:09",14252416
"2024-05-13 16:58:19",14915560
"2024-05-13 16:58:30",15484196
"2024-05-13 16:58:40",16170324
```

##### After
```
"2024-05-13 16:35:44",1355428
"2024-05-13 16:35:54",1391028
"2024-05-13 16:36:04",1673720
"2024-05-13 16:36:14",2103716
"2024-05-13 16:36:24",2129876
"2024-05-13 16:36:35",2166412
"2024-05-13 16:36:45",2177672
"2024-05-13 16:36:55",2188340
"2024-05-13 16:37:05",2190688
"2024-05-13 16:37:15",2195168
"2024-05-13 16:37:26",2199296
"2024-05-13 16:37:36",2228052
"2024-05-13 16:37:46",2238104
"2024-05-13 16:37:56",2260624
"2024-05-13 16:38:06",2307184
"2024-05-13 16:38:16",2331140
"2024-05-13 16:38:27",2323388
"2024-05-13 16:38:37",2357552
"2024-05-13 16:38:47",2352948
"2024-05-13 16:38:57",2364744
"2024-05-13 16:39:07",2368528
"2024-05-13 16:39:18",2385492
"2024-05-13 16:39:28",2389184
"2024-05-13 16:39:38",2388060
"2024-05-13 16:39:48",2388336
"2024-05-13 16:39:58",2386916
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46131 from JacobZheng0927/zstdMemoryLeak.

Authored-by: JacobZheng0927 <zsh517559523@163.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit e265c60)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
@mridulm
Copy link
Contributor

mridulm commented Jun 18, 2024

Merged to master and 3.5
Thanks for fixing this @JacobZheng0927 !
Thanks for the review @dongjoon-hyun :-)

@LuciferYang
Copy link
Contributor

LuciferYang commented Jun 19, 2024

If the PR includes a new log behavior, it may need to be manually submitted to branch-3.5/3.4 if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation of branch-3.5/branch-3.4 to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?

[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

@LuciferYang
Copy link
Contributor

If the PR includes a new log behavior, it may need to be manually submitted to branch-3.5/3.4 if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation of branch-3.5/branch-3.4 to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?

[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

also cc @gengliangwang @panbingkun should we introduce a pseudo log MDC behavior in branch-3.5?

@panbingkun
Copy link
Contributor

If the PR includes a new log behavior, it may need to be manually submitted to branch-3.5/3.4 if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation of branch-3.5/branch-3.4 to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?

[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

also cc @gengliangwang @panbingkun should we introduce a pseudo log MDC behavior in branch-3.5?

It seems that structured logs are only available in Spark 4.0

@JacobZheng0927
Copy link
Contributor Author

如果 PR 中包含了新的log行为,需要进行反向移植的话可能需要手动提交branch-3.5/3.4,因为结构化日志功能是 4.0 才加入的,这会导致编译branch-3.5/branch-3.4失败。@JacobZheng0927,你有时间修复branch-3.5的编译失败吗?

[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

Okay, I will fix it in 3.5.

@LuciferYang
Copy link
Contributor

Thanks @JacobZheng0927

cloud-fan pushed a commit that referenced this pull request Jun 19, 2024
### What changes were proposed in this pull request?
The pull request #46131 was merged into the 3.5 branch. The log output in this PR uses MDC, which is not yet supported in version 3.5.

### Why are the changes needed?
Remove the use of MDC.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
No need.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #47022 from JacobZheng0927/SPARK-47910-3.5.

Authored-by: JacobZheng0927 <zsh517559523@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@mridulm
Copy link
Contributor

mridulm commented Jun 19, 2024

Oh crap, forgot this is using structured logging ... should have caught it before merging to 3.5 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants